package com.pms.utils;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.pms.constant.MQConstant;
import com.pms.entity.BaseModel;
import com.pms.mapper.IotWaterPumpGroupAttrMapper;
import com.pms.mapper.WaterPumpGroupAttrMapper;
import com.pms.mq.RabbitProducerTest;
import com.pms.service.IIotWaterPumpGroupService;
import com.pms.service.IWaterPumpGroupService;
import com.pms.util.DateUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.StringTokenizer;

/**
 * Created by Administrator on 2018/3/22.
 */
@Component
public class MQWaterPumpMsgToMysql {
    private Logger logger = LoggerFactory.getLogger(MQWaterPumpMsgToMysql.class);
    @Autowired
    private WaterPumpGroupAttrMapper waterPumpAttrMapper;
//    @Autowired
//    private IWaterPumpGroupService waterPumpService;
    @Autowired
    private IotWaterPumpGroupAttrMapper iotwaterPumpAttrMapper;
    @Autowired
    private IIotWaterPumpGroupService waterPumpService;
    @Autowired
    RabbitProducerTest rabbitProducerTest;

    /**
     * 订阅消息
     */
    @RabbitListener(queues = MQConstant.QUEUE_NAME_IOT_MQTT_STORAGE)
    public void iotMqttStorageQueueMessageSubscribe(String message) {
//        waterPumpWebSocketUtil.sendInfo("短信队列接收到消息为:"+message);
        if (StringUtils.isNotBlank(message)) {
            if (message.indexOf("{") == 0) {
                try {
                    JSONObject wpAttrValueJSON = JSONObject.parseObject(message);
                    Integer deciveStatus = null;
                    //设备状态
                    deciveStatus = wpAttrValueJSON.getJSONObject("body").getJSONObject("metadata").getInteger("cr_device_status");
                    //设备名称 及 上报时间
                    String requestIDStr = wpAttrValueJSON.getString("requestID");
                    if (StringUtils.isNotBlank(requestIDStr)) {
                        StringTokenizer stringTokenizer = new StringTokenizer(requestIDStr, "/");
                        List<String> requestIDStrList = new ArrayList<String>();
                        while (stringTokenizer.hasMoreTokens()) {
                            requestIDStrList.add(stringTokenizer.nextToken());
                        }
                        if (requestIDStrList.size() > 1) {
                            StringBuffer reportParamColumnStr = new StringBuffer(80).append("");
                            StringBuffer reportParamValueStr = new StringBuffer(100).append("");
                            JSONObject waterPumpAttrJson = new JSONObject();

                            reportParamColumnStr.append("waterPumpGroupId").append(",");//id值
                            reportParamColumnStr.append("deviceStatus").append(",");//设备状态
                            reportParamColumnStr.append("deviceName");//设备名称;

                            Long waterPumpGroupId = BaseModel.returnStaticIdLong();
                            reportParamValueStr.append(waterPumpGroupId).append(",");//id值
                            reportParamValueStr.append(deciveStatus).append(",");//设备状态

                            reportParamValueStr.append("'" + requestIDStrList.get(0) + "'");//设备名称
                            waterPumpAttrJson.put("waterPumpGroupId", waterPumpGroupId);
                            waterPumpAttrJson.put("deviceStatus", deciveStatus);
                            waterPumpAttrJson.put("deviceName", requestIDStrList.get(0));
                            if (requestIDStrList.size() > 1) {
                                Long reportTimeLong = null;
                                Long LastReportTimeStamp = System.currentTimeMillis();
                                reportTimeLong = Long.parseLong(requestIDStrList.get(1));
                                if ((reportTimeLong << 1) < LastReportTimeStamp) {// 上报时间左移1位
                                    reportTimeLong = reportTimeLong * 1000;
                                }
                                reportParamColumnStr.append(",").append("reportTime");//上报时间
                                reportParamColumnStr.append(",").append("createTime");//创建时间
                                reportParamColumnStr.append(",").append("reportTimeFormat");//格式化后的上报时间
                                reportParamValueStr.append(",").append(requestIDStrList.get(1));
                                reportParamValueStr.append(",").append("'" + DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss") + "'");
                                reportParamValueStr.append(",").append("'" + DateUtil.longToString(reportTimeLong, "yyyy-MM-dd HH:mm:ss") + "'");
                                waterPumpAttrJson.put("reportTime", reportTimeLong);//当前显示时间格式为时间戳
                                waterPumpAttrJson.put("createTime", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"));//当前显示时间格式为时间戳
                                waterPumpAttrJson.put("reportTimeFormat", "'" + DateUtil.longToString(reportTimeLong, "yyyy-MM-dd HH:mm:ss") + "'");
                            }
                            boolean waterPumpAttrErrTag = false;
                            JSONObject ruleJson = waterPumpService.getMessageFormatRuleJson(requestIDStrList.get(0));//根据设备名称查规则
                            JSONArray dataJsonArray = wpAttrValueJSON.getJSONObject("tsdb").getJSONArray("data");
                            for (int x = 0, dataJsonArraySize = dataJsonArray.size(); x < dataJsonArraySize; x++) {
                                JSONObject dataJson = (JSONObject) dataJsonArray.get(x);
                                String nameVar = dataJson.getString("name");
                                //录入属性值
                                reportParamColumnStr.append(",").append(nameVar);
//                                reportParamValueStr.append(",").append(dataJson.getInteger("val"));
                                Object value = kafkaMessageFormat(nameVar, dataJson.getInteger("val"), ruleJson);
                                reportParamValueStr.append(",").append(value);
                                waterPumpAttrJson.put(nameVar, value);
                                /* 判断是否有故障  */
                                if (nameVar.equals("ZXGZ")) {
                                    if (dataJson.getInteger("val") != 0) {
                                        waterPumpAttrErrTag = true;
                                    }
                                }
                            }
                            reportParamColumnStr.append(",").append("isFault");// 是否故障
                            if (waterPumpAttrErrTag) {//有设备故障
                                reportParamValueStr.append(",").append(1);
                                waterPumpAttrJson.put("isFault", 1);
                            } else {
                                reportParamValueStr.append(",").append(0);
                                waterPumpAttrJson.put("isFault", 0);
                            }
                            StringBuffer insertKafkaDataSql = new StringBuffer(300);
//                            insertKafkaDataSql.append("insert into water_pump_group_attr(");
//                            insertKafkaDataSql.append(reportParamColumnStr.toString());
//                            insertKafkaDataSql.append(") values(");
//                            insertKafkaDataSql.append(reportParamValueStr.toString());
//                            insertKafkaDataSql.append(");");

                            insertKafkaDataSql.append("insert into iot_water_pump_group_attr(");
                            insertKafkaDataSql.append(reportParamColumnStr.toString());
                            insertKafkaDataSql.append(") values(");
                            insertKafkaDataSql.append(reportParamValueStr.toString());
                            insertKafkaDataSql.append(");");
                            if (waterPumpAttrErrTag) {//有设备故障
//                                insertKafkaDataSql.append("insert into water_pump_group_fault(");
//                                insertKafkaDataSql.append("waterPumpGroupId,ZXGZ,deviceName,reportTimeFormat,isSocketPush,isSmsPush,createTime");
//                                insertKafkaDataSql.append(") value(");
//                                insertKafkaDataSql.append(waterPumpAttrJson.getLongValue("waterPumpGroupId"));
//                                insertKafkaDataSql.append(",").append(waterPumpAttrJson.get("ZXGZ"));
//                                insertKafkaDataSql.append(",").append("'" + waterPumpAttrJson.get("deviceName") + "'");
//                                insertKafkaDataSql.append(",").append(waterPumpAttrJson.get("reportTimeFormat"));
//                                insertKafkaDataSql.append(",").append(0);//isSocketPush
//                                insertKafkaDataSql.append(",").append(0);//isSmsPush
//                                insertKafkaDataSql.append(",").append("'" + DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss") + "'");//createTime
//                                insertKafkaDataSql.append(");");

                                insertKafkaDataSql.append("insert into iot_alarm_record(");
                                insertKafkaDataSql.append("id,alarm_type,describe,group_id,creat_time,group_status,communication_status,is_reade");
                                insertKafkaDataSql.append(") value(");
                                insertKafkaDataSql.append(BaseModel.returnStaticIdLong());
                                insertKafkaDataSql.append(",").append(waterPumpAttrJson.getIntValue("ZXGZ"));
                                insertKafkaDataSql.append(",").append(gzDescribe(waterPumpAttrJson.getIntValue("ZXGZ")));
                                insertKafkaDataSql.append(",").append( waterPumpService.getGroupIdByCode( requestIDStrList.get(0) ) );
                                insertKafkaDataSql.append(",").append("'" + DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss") + "'");//isSocketPush
                                insertKafkaDataSql.append(",").append(0);//0：异常，1：正常
                                insertKafkaDataSql.append(",").append(0);//0：异常，1：正常
                                insertKafkaDataSql.append(",").append(2);//是否已读，1已读，2未读
                                insertKafkaDataSql.append(");");
                            }
//                            waterPumpAttrMapper.insertKafkaData(insertKafkaDataSql.toString());
                            iotwaterPumpAttrMapper.insertKafkaData(insertKafkaDataSql.toString());
                            String pushIotMessage = waterPumpAttrJson.toJSONString();
                            if (waterPumpAttrErrTag) {// 故障记录添加成功后
                                rabbitProducerTest.SocketQueuePushMessage(pushIotMessage);//推送故障消息至 wobsocket推送 订阅队列
                                rabbitProducerTest.SMSQueuePushMessage(pushIotMessage);//推送故障消息至 短信推送 订阅队列
                            }
//                            rabbitProducerTest.everydayMessageQueuePushMessage(waterPumpAttrJson);
                        } else {
                            logger.info("kafka上报的信息:上报时间获取为空");
                        }
                    } else {
                        logger.info("kafka上报的信息:设备名称获取为空");
                    }
                } catch (NumberFormatException ex) {
                    logger.info("数据格式转换异常,异常信息为:" + ex.getMessage());
                    logger.info("mqtt上传的消息为: " + message);
                } catch (JSONException ex) {
                    logger.info("Json异常,异常信息为:" + ex.getMessage());
                    logger.info("mqtt上传的消息为: " + message);
                }
            } else {
                logger.info("mqtt上传的消息为: " + message);
            }
        } else {
            logger.info("mqtt上传的消息为: " + message);
        }
    }

    public Object kafkaMessageFormat(String key, Object value, JSONObject rulesJson) {
        try {
            if (value == null) {
                return value;
            }
            JSONObject keyRuleJson = rulesJson.getJSONObject(key);
            if (keyRuleJson == null || keyRuleJson.isEmpty()) {
                return value;
            }
            String keyRuleWay = keyRuleJson.getString("way");
            if (StringUtils.isBlank(keyRuleWay) || keyRuleWay.equals("*")) {
                return value;
            }
            if (keyRuleWay.equals("add")) {//加
                return (Integer) value + keyRuleJson.getDoubleValue("val");
            }
            if (keyRuleWay.equals("sub")) {//减
                return (Integer) value - keyRuleJson.getDoubleValue("val");
            }
            if (keyRuleWay.equals("ride")) {//乘
                return (Integer) value * keyRuleJson.getDoubleValue("val");
            }
            if (keyRuleWay.equals("divide")) {//除
                return (Integer) value / keyRuleJson.getDoubleValue("val");
            }
            if (keyRuleWay.equals("hexadecimal")) {//进制转换
                String hexadecimalValueStr = null;
                if ("2".equals(keyRuleJson.getString("val"))) {//转换为2进制
                    hexadecimalValueStr = Integer.toBinaryString((Integer) value);
                    // 不足16位，前面补0
                    int hexadecimalValueStrLength = hexadecimalValueStr.length();
                    StringBuffer sixteenZero = new StringBuffer("");
                    if (hexadecimalValueStrLength < 16) {
                        for (int x = 0, forNum = 16 - hexadecimalValueStrLength; x < forNum; x++) {
                            sixteenZero.append("0");
                        }
                    }
                    hexadecimalValueStr = sixteenZero.toString() + hexadecimalValueStr;
                } else if ("8".equals(keyRuleJson.getString("val"))) {//转换为8进制
                    hexadecimalValueStr = Integer.toOctalString((Integer) value);
                } else if ("16".equals(keyRuleJson.getString("val"))) {//转换为16进制
                    hexadecimalValueStr = Integer.toHexString((Integer) value);
                }
                //字符串反转
                String isReversal = keyRuleJson.getString("isReversal");//是否进行字符串反转
                if ("1".equals(isReversal)) {
                    hexadecimalValueStr = new StringBuffer(hexadecimalValueStr).reverse().toString();//反转字符串
                }
                StringBuffer ValueStrBuffer = new StringBuffer("");
                String readWay = keyRuleJson.getString("readWay");
                if ("index".equals(readWay)) {//如果读取方式为下标读取
                    String readVal = keyRuleJson.getString("readVal");
                    JSONObject indexJson = keyRuleJson.getJSONObject("indexJson");
                    int lastIndex = hexadecimalValueStr.lastIndexOf(readVal);
                    if (indexJson == null || indexJson.isEmpty()) {
                        int index = hexadecimalValueStr.indexOf(readVal);
                        ValueStrBuffer.append(index);
                        while (index < lastIndex) {
                            index = hexadecimalValueStr.indexOf(readVal, index + 1);
                            ValueStrBuffer.append(",").append(index);
                        }
                    } else {
                        int index = hexadecimalValueStr.indexOf(readVal);
                        ValueStrBuffer.append(indexJson.get(index + ""));
                        while (index < lastIndex) {
                            index = hexadecimalValueStr.indexOf(readVal, index + 1);
                            ValueStrBuffer.append(",").append(indexJson.get(index + ""));
                        }
                    }
                }
                if ("compareChar".equals(readWay)) {//比较字符
                    for(int x=0;x<8;x++){
                        String strVar=hexadecimalValueStr.substring(2*x,(2*(x+1)) );
                        if( strVar.equals("00") ){// 水泵状态 未运行
                            ValueStrBuffer.append("2");
                        }else if( strVar.equals("01") ){// 水泵状态 工频运行
                            ValueStrBuffer.append("1");
                        }else if(strVar.equals("10") ){// 水泵状态 变频运行
                            ValueStrBuffer.append("0");
                        }else{
                            ValueStrBuffer.append("4");// 水泵状态 未使用
                        }
                        ValueStrBuffer.append(",");
                    }
                }
                if (ValueStrBuffer.toString().equals("null")) {
                    logger.error("转换出来的消息为null,打印数据为:");
                    logger.error(key + "=" + value);
                    return null;
                }
                return "'" + ValueStrBuffer.toString() + "'";
            }
        } catch (Exception ex) {
            logger.info(key + "转换值有异常:" + ex.getMessage());
            return null;
        }
        return value;//均不匹配规则，返回原值
    }


    public String gzDescribe(Integer gzCode){
        if(gzCode==null){
            return "'未知故障'";
        }
//        0-正常,1-水位低,2-变频器故障,3-超压报警,4-传感器故障,5-自检故障
        switch (gzCode){
            case 3:return "'超压报警'";
            case 2:return "'超压报警'";
            case 1:return "'水位低'";
            case 4: return "'传感器故障'";
            case 5: return "'自检故障'";
            default: return "'未知故障'";
        }
    }


}
